compute: extend temporal bucketing to non-ArrangeBy arrangements#36644
compute: extend temporal bucketing to non-ArrangeBy arrangements#36644ggevay wants to merge 5 commits into
Conversation
528ffbe to
557bc76
Compare
557bc76 to
b5f9752
Compare
|
#36648 is meant to supersede this PR. Edit: Actually, I've now updated this PR in case we want to move forward with this one. |
fec2af8 introduced temporal bucketing on `PlanNode::ArrangeBy`: the lowering picks an `ArrangementStrategy` from a `has_future_updates` analysis bit, and `ensure_collections` inserts a bucketing operator in front of the arrangement when the strategy is `TemporalBucketing`. Of the other bucketing-eligible operators, only `Threshold` inherits this coverage transitively, because its lowering wraps the input in a synthetic `ArrangeBy` whose target is `new_arranged([required])` (so `ensure_collections` actually inserts a bucket op there). `TopK` and `Negate` also wrap their inputs, but the wrap targets `new_raw()` (i.e., no actual arrangement), so `ensure_collections` sees `will_create_arrangement = false` and the wrap's strategy is decorative; neither operator is bucketed today. `Reduce` is left out of both hooks: its LIR node has no strategy field, and its render path (`render_reduce` via `KeyValPlan`) bypasses `ensure_collections`. As a result, a `Temporal Filter -> GroupAggregate` pattern — where no `ArrangeBy` sits between the temporal MFP and the reduce — is never bucketed today. Add a `temporal_bucketing_strategy: ArrangementStrategy` field to the `Reduce` LIR node and set it during lowering from the input's `has_future_updates` flag. At render time, `render_reduce` applies `apply_bucketing_strategy` to the `(key, val)` stream right before `render_reduce_plan` arranges it internally. The lowering also clears `LoweredExpr::has_future_updates` on bucketing absorption, so a stack of bucketing-eligible operators only buckets at the lowest one; a trailing temporal MFP fused above naturally re-arms the flag. The render-side `bucketed: bool` safety net in `ensure_collections` is still needed to prevent double-bucketing within a single call (the raw-collection site vs. the `collections.arranged` loop, or across multiple keys in that loop). TODO: Union with consolidation, TopK. See next commit.
19c9dc8 to
a708122
Compare
a708122 to
2dbd49f
Compare
Follow-up to the Reduce-only temporal-bucketing commit:
- TopK: add `temporal_bucketing_strategy: ArrangementStrategy` to
`PlanNode::TopK` and `RenderPlan::Expr::TopK`. Lowering populates it
via `strategy_from_future(input_future)` and clears
`has_future_updates` when `TemporalBucketing` is chosen. `render_topk`
calls `apply_bucketing_strategy` on the input stream before building
the Top-K arrangement(s). Note that this is not ideal for
`MonotonicTop1Plan { must_consolidate: false }`, where we add temporal
bucketing with no downstream consolidator; accepted for now and noted
in a comment. Later, we could move the temporal bucketing decision
into a new LIR pass.
- Union: add `temporal_bucketing_strategies: Vec<ArrangementStrategy>`
to `PlanNode::Union` and `RenderPlan::Expr::Union`, lockstep with
`inputs`. The renderer applies `apply_bucketing_strategy` per leg
before `concatenate`/`consolidate_named`. Per-leg strategies are
non-`Direct` only when the Union consolidates its output.
- Threshold: clear `has_future_updates` when the conditional
`arrange_by` wrap actually fires with `TemporalBucketing`; the
synthesized `ArrangeBy` (built with `new_arranged`) installs the
bucket op at render time via `ensure_collections`.
- Fold the `refine_union_negate_consolidation` LIR pass into lowering,
because we need to know about this when the lowering is deciding about
temporal bucketing. (Later, we could move the temporal bucketing
decision into a new LIR pass.)
- Propagate the new fields through EXPLAIN text/json, the
`PlanNode`/`RenderPlan` interpreter dispatch, and physically-monotonic
interpretation.
EXPLAIN tests (mirroring commit 623f53d for Reduce):
- src/compute-types/src/explain/text.rs prints
`temporal_bucketing_strategy=...` for TopK and
`temporal_bucketing_strategies=[...]` for Union when any strategy is
non-`Direct`.
- test/sqllogictest/temporal_bucketing.slt: TopK firing (LIMIT over a
temporal filter), TopK idempotency (over an already-bucketed indexed
view), Union firing (EXCEPT ALL over two temporal filters).
- test/sqllogictest/explain/physical_plan_as_json.slt: expected JSON
updated to include the new fields on TopK and Union nodes.
2dbd49f to
0ea6014
Compare
0ea6014 to
2ee917d
Compare
antiguru
left a comment
There was a problem hiding this comment.
I read most of it, but it's very dense. Overall some nits that comments aren't accurate, and the change mixes some changes (inlining refine_union_negate, explain changes). It seems to do the right thing.
The big issue I see with this change is that it introduces complexity, which we shouldn't have in the first place. It's not just caused by this PR, as my change to make temporal bucketing a property of LIR laid the groundwork for this. With this change, however, it becomes very apparent that this is the wrong approach and we need an alternative.
So, a way forward could be to merge this change and then commit on spending time to implement an alternative that localizes complexity instead of spreading it all around. The idea would be to opt-in to temporal bucketing cluster-wide as a setting one needs to specify, or make it so good it doesn't increase cost. Let's discuss this approach, and how to move forward with this change.
| // TODO(temporal-bucketing): `has_future_updates` is computed per | ||
| // dataflow; we don't currently propagate it across `Id::Global` | ||
| // boundaries (e.g., from an MV's dataflow to its consumer's), so a | ||
| // downstream-only `Get`-then-`ArrangeBy` won't bucket unless the | ||
| // consumer has its own local temporal MFP. |
There was a problem hiding this comment.
Correct the comment please. MVs never produce future updates, indexes don't either.
| // | ||
| // `TopK` itself buckets (via `apply_bucketing_strategy` at the | ||
| // top of `render_topk`) when the strategy says so, so the | ||
| // output flag is cleared whenever bucketing actually fires. | ||
| // | ||
| // Bucketing absorption: see `strategy_from_future`. | ||
| let temporal_bucketing_strategy = strategy_from_future(input_future); | ||
| let has_future_updates = match temporal_bucketing_strategy { | ||
| ArrangementStrategy::TemporalBucketing => false, | ||
| ArrangementStrategy::Direct => input_future, | ||
| }; |
There was a problem hiding this comment.
I'm not sure I follow this part: The output of TopK should only be marked as has_future_updates if it has an MFP at the end with a temporal filter, but not based on its inputs?
| // actually bucket. If the wrap is skipped, the input already | ||
| // had a suitable arrangement and an upstream operator was | ||
| // responsible for bucketing. | ||
| let wrap_fires = !keys |
There was a problem hiding this comment.
Can we rename the variable? wrap_fires only makes sense after reading the comment.
| /// | ||
| /// Convention: every caller that returns `TemporalBucketing` must also clear | ||
| /// `LoweredExpr::has_future_updates` on the resulting `LoweredExpr`, so that a stack of | ||
| /// bucketing-eligible operators only buckets at the lowest one. A trailing temporal MFP | ||
| /// fused on top naturally re-arms the flag. |
There was a problem hiding this comment.
I don't think this is true in general? Can make more specific, which is that any arrangement that absorbs data that can have future updates should be guarded by a temporal bucketing operator. The output of an operator can have future updates if it applies a non-safe MFP.
| // Fold the deleted `refine_union_negate_consolidation` pass in | ||
| // here: a Union with any `Negate` input should consolidate its | ||
| // output. The lowering is the only place where this decision | ||
| // can be coupled with the per-input bucketing strategy. | ||
| let has_negate_input = lowered_inputs | ||
| .iter() | ||
| .any(|l| matches!(l.plan.node, PlanNode::Negate { .. })); | ||
| let consolidate_output = | ||
| self.enable_consolidate_after_union_negate && has_negate_input; |
There was a problem hiding this comment.
Can we not fold enable_consolidate_after_union_negate as part of this PR?
| D: timely::ExchangeData | ||
| + crate::typedefs::MzData | ||
| + Ord | ||
| + Clone | ||
| + std::fmt::Debug | ||
| + differential_dataflow::Hashable; |
There was a problem hiding this comment.
Can be simplified by using differential_dataflow::Data here.
See commit messages.